/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.tests.util.util;

import org.apache.flink.tests.util.parameters.ParameterProperty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Collection of file-related utilities.
 */
public class FileUtils {

	private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);

	private static final ParameterProperty<Path> PROJECT_ROOT_DIRECTORY = new ParameterProperty<>("rootDir", Paths::get);
	private static final ParameterProperty<Path> DISTRIBUTION_DIRECTORY = new ParameterProperty<>("distDir", Paths::get);

	/**
	 * Matches the given {@link Pattern} against all lines in the given file, and replaces all matches with the replacement
	 * generated by the given {@link Function}.
	 * All unmatched lines and provided replacements are written into the file, with the order corresponding to the
	 * original content. Newlines are automatically added to each line; this implies that an empty replacement string
	 * will result in an empty line to be written.
	 */
	public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer) throws IOException {
		final List<String> fileLines = Files.readAllLines(file);
		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
			for (String fileLine : fileLines) {
				Matcher matcher = pattern.matcher(fileLine);
				if (matcher.matches()) {
					String replacement = replacer.apply(matcher);
					pw.println(replacement);
				} else {
					pw.println(fileLine);
				}
			}
		}
	}

	public static Path findFlinkDist() {
		Optional<Path> distributionDirectory = DISTRIBUTION_DIRECTORY.get();
		if (!distributionDirectory.isPresent()) {
			LOG.debug("The '{}' property was not set; attempting to automatically determine distribution location.", DISTRIBUTION_DIRECTORY.getPropertyName());

			Path projectRootPath;
			Optional<Path> projectRoot = PROJECT_ROOT_DIRECTORY.get();
			if (projectRoot.isPresent()) {
				// running with maven
				projectRootPath = projectRoot.get();
			} else {
				// running in the IDE; working directory is test module
				Optional<Path> projectRootDirectory = findProjectRootDirectory(Paths.get("").toAbsolutePath());
				// this distinction is required in case this class is used outside of Flink
				if (projectRootDirectory.isPresent()) {
					projectRootPath = projectRootDirectory.get();
				} else {
					throw new IllegalArgumentException(
						"The 'distDir' property was not set and the flink-dist module could not be found automatically." +
							" Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir=<path> .");
				}
			}
			Optional<Path> distribution = findDistribution(projectRootPath);
			if (!distribution.isPresent()) {
				throw new IllegalArgumentException(
					"The 'distDir' property was not set and a distribution could not be found automatically." +
						" Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir=<path> .");
			} else {
				distributionDirectory = distribution;
			}
		}
		return distributionDirectory.get();
	}

	private static Optional<Path> findProjectRootDirectory(Path currentDirectory) {
		// move up the module structure until we find flink-dist; relies on all modules being prefixed with 'flink'
		do {
			if (Files.exists(currentDirectory.resolve("flink-dist"))) {
				return Optional.of(currentDirectory);
			}
			currentDirectory = currentDirectory.getParent();
		}  while (currentDirectory.getFileName().toString().startsWith("flink"));
		return Optional.empty();
	}

	private static Optional<Path> findDistribution(Path projectRootDirectory) {
		final Path distTargetDirectory = projectRootDirectory.resolve("flink-dist").resolve("target");
		try {
			Collection<Path> paths = org.apache.flink.util.FileUtils.listFilesInDirectory(
				distTargetDirectory,
				FileUtils::isDistribution);
			if (paths.size() == 0) {
				// likely due to flink-dist not having been built
				return Optional.empty();
			}
			if (paths.size() > 1) {
				// target directory can contain distributions for multiple versions, or it's just a dirty environment
				LOG.warn("Detected multiple distributions under flink-dist/target. It is recommended to explicitly" +
					" select the distribution by setting the '{}}' property.", DISTRIBUTION_DIRECTORY.getPropertyName());
			}
			// jar should be in /lib; first getParent() returns /lib, second getParent() returns distribution directory
			return Optional.of(paths.iterator().next().getParent().getParent());
		} catch (IOException e) {
			LOG.error("Error while searching for distribution.", e);
			return Optional.empty();
		}
	}

	private static boolean isDistribution(Path path) {
		// check for `lib/flink-dist*'
		// searching for the flink-dist jar is not sufficient since it also exists in the modules 'target' directory
		return path.getFileName().toString().contains("flink-dist")
			&& path.getParent().getFileName().toString().equals("lib");
	}
}
